package org.huangrui.spark.scala.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, functions}
import org.huangrui.spark.scala.sql.SparkSQL09_Source_Req_2.CityRemarkUDAF

/**
 * @Author hr
 * @Create 2024-10-21 11:21 
 */
object SparkSQL09_Source_Req_3 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "huangrui")
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
    spark.sql("use db_spark")
    // 查询基本数据
    spark.sql(
      """
        |  select
        |     a.*,
        |     p.product_name,
        |     c.area,
        |     c.city_name
        |  from user_visit_action a
        |  join product_info p on a.click_product_id = p.product_id
        |  join city_info c on a.city_id = c.city_id
        |  where a.click_product_id > -1
            """.stripMargin).createOrReplaceTempView("t1")

    // 根据区域，商品进行数据聚合
    spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
    spark.sql(
      """
        |  select
        |     area,
        |     product_name,
        |     count(*) as clickCnt,
        |     cityRemark(city_name) as city_remark
        |  from t1 group by area, product_name
            """.stripMargin).createOrReplaceTempView("t2")

    // 区域内对点击数量进行排行
    spark.sql(
      """
        |  select
        |      *,
        |      rank() over( partition by area order by clickCnt desc ) as rank
        |  from t2
            """.stripMargin).createOrReplaceTempView("t3")

    // 取前3名
    spark.sql(
      """
        | select
        |     *
        | from t3 where rank <= 3
            """.stripMargin).show(false)


    spark.stop()
  }
}
